-
Notifications
You must be signed in to change notification settings - Fork 0
refactor(llc)!: update Emitter architecture and implement Stream directly #26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Refactors `SharedEmitter` and `StateEmitter` to implement `Stream<T>` directly, removing the need for a separate `.stream` getter. This allows emitters to be used natively as streams while maintaining their specific emitter functionality. Key changes: - `SharedEmitter` and `StateEmitter` now implement `Stream<T>`. - Moved `EventEmitter` to a generic utility implementation in `src/utils/event_emitter.dart`. - Introduced `StreamEvent` base interface and `EventResolver` for type-safe event transformations. - Added `asSharedEmitter()` and `asStateEmitter()` extension methods to provide read-only views. - Added state manipulation helpers: `update`, `getAndUpdate`, and `updateAndGet` to `MutableStateEmitter`. - Added `hasListener` and `isClosed` properties to the emitter interfaces. - Updated `WsEvent` to implement `StreamEvent`.
📝 WalkthroughWalkthroughSharedEmitter and StateEmitter now implement Stream directly (stream getters removed). Introduced StreamEvent/EventResolver, a generic MutableEventEmitter with resolver chain, mutable/read-only emitter variants and Subject-backed implementations, ws event typing updates, export changes, and new emitter/state tests. Changes
Sequence Diagram(s)sequenceDiagram
participant Producer
participant MutableEventEmitter
participant ResolverChain as Resolvers...
participant Subject as _shared
participant Listener
Producer->>MutableEventEmitter: emit(event)
MutableEventEmitter->>ResolverChain: apply resolvers in order
alt resolver returns non-null
ResolverChain-->>MutableEventEmitter: resolvedEvent
MutableEventEmitter->>Subject: add(resolvedEvent)
else none matched
ResolverChain-->>MutableEventEmitter: null
MutableEventEmitter->>Subject: add(originalEvent)
end
Subject->>Listener: deliver(event)
Note right of MutableEventEmitter: supports replay / sync / close
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. 📜 Recent review detailsConfiguration used: defaults Review profile: CHILL Plan: Pro Disabled knowledge base sources:
📒 Files selected for processing (2)
🔇 Additional comments (2)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #26 +/- ##
==========================================
+ Coverage 34.70% 40.39% +5.68%
==========================================
Files 53 53
Lines 1066 1057 -9
==========================================
+ Hits 370 427 +57
+ Misses 696 630 -66 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
packages/stream_core/lib/src/utils/shared_emitter.dart (1)
169-174: Consider documenting behavior when emitter closes without matching event.
whereType<E>().firstthrowsStateErrorif the stream closes without emitting an event of typeE. Consider adding documentation about this edge case./// Throws a [TimeoutException] if [timeLimit] is exceeded before receiving /// an event of type [E]. + /// + /// Throws a [StateError] if the emitter closes before an event of type [E] + /// is received.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (9)
packages/stream_core/CHANGELOG.mdpackages/stream_core/lib/src/utils.dartpackages/stream_core/lib/src/utils/event_emitter.dartpackages/stream_core/lib/src/utils/shared_emitter.dartpackages/stream_core/lib/src/utils/state_emitter.dartpackages/stream_core/lib/src/ws.dartpackages/stream_core/lib/src/ws/client/stream_web_socket_client.dartpackages/stream_core/lib/src/ws/events/event_emitter.dartpackages/stream_core/lib/src/ws/events/ws_event.dart
💤 Files with no reviewable changes (2)
- packages/stream_core/lib/src/ws.dart
- packages/stream_core/lib/src/ws/events/event_emitter.dart
🔇 Additional comments (16)
packages/stream_core/lib/src/utils.dart (1)
4-4: LGTM! Export addition aligns with the new event emitter utilities.The export correctly exposes the new
StreamEvent,EventResolver,EventEmitter, andMutableEventEmittertypes to the public API surface.packages/stream_core/lib/src/ws/client/stream_web_socket_client.dart (1)
77-78: LGTM! Type parameters enhance type safety.The addition of
<WsEvent>generic constraints ensures that onlyWsEventtypes can be emitted and listened to, providing compile-time type safety. This aligns well with the newEventResolver<WsEvent>support introduced in the constructor.packages/stream_core/CHANGELOG.md (1)
1-14: LGTM! Comprehensive and well-structured changelog.The breaking changes are clearly documented, and the feature list accurately reflects the refactoring. The distinction between breaking changes and new features helps users understand migration requirements.
packages/stream_core/lib/src/utils/event_emitter.dart (3)
13-13: LGTM! Clean marker interface for event types.The
StreamEventmarker interface provides a clear contract for types that can be emitted through the event system.
26-26: LGTM! EventResolver signature is well-designed.The resolver function signature is clear and the documentation effectively explains the null-return convention for passing events to the next resolver in the chain.
60-85: LGTM! Resolver chain implementation is correct.The
emit()method correctly applies resolvers in sequence, emitting the first non-null transformation result. The fallback to emitting the original event ensures no events are lost when no resolver matches.One consideration for future enhancement: The current design always emits an event (either transformed or original). If use cases emerge where events should be filtered out entirely, consider adding a special sentinel value or a separate filtering mechanism.
packages/stream_core/lib/src/ws/events/ws_event.dart (1)
5-10: This change does not introduce any breaking changes.WsEvent has no subclasses in the codebase, and there are no equality comparisons on WsEvent instances. The transition from
extends Equatabletoimplements StreamEventis safe and aligns with the event architecture, whereStreamEventserves as a marker interface for the event routing system.packages/stream_core/lib/src/utils/state_emitter.dart (4)
8-19: Clean read-only interface design.The minimal surface with only
valuegetter aligns well with the Kotlin StateFlow analogy and encapsulation principles.
40-54: Well-structured mutable interface with clear conflation semantics.The documentation properly explains that setting a value equal to the current value does nothing, which is important for users to understand the behavior.
74-111: Solid implementation with proper conflation and lifecycle management.The use of
StreamView<T>for delegation, equality-based conflation in the setter, and proper forwarding ofhasListener/isClosed/close()to the backing subject is clean and idiomatic.
113-165: Useful convenience methods following established patterns.The extension provides idiomatic helper methods similar to Kotlin's
getAndUpdate/updateAndGet. TheasStateEmitter()approach for type narrowing is a common pattern that provides compile-time read-only semantics.packages/stream_core/lib/src/utils/shared_emitter.dart (5)
7-22: Good architectural choice implementingStream<T>directly.Eliminating the
.streamgetter and havingSharedEmitterimplementStream<T>directly simplifies the API and aligns with the PR's stated objectives.
44-69: Well-designed emission API with throwing and non-throwing variants.The distinction between
emit(throws on failure) andtryEmit(returns bool) provides flexibility for different use cases. Documentation clearly explains the behavioral difference.
101-138: Clean implementation with proper replay validation.The switch expression in
_createSubjectelegantly handles the replay configuration, and throwingArgumentErrorfor negative replay count is appropriate defensive programming. TheStreamView<T>delegation pattern keeps the implementation concise.
140-155: Consistent type-narrowing pattern.The
asSharedEmitter()extension follows the same pattern asasStateEmitter(), maintaining API consistency across the emitter types.
176-187: Clean type-filtered event handler.The
on<E>method provides a convenient way to subscribe to specific event types, similar to event handler patterns in other reactive frameworks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
packages/stream_core/test/utils/shared_emitter_test.dart (1)
452-477: Consider emission order assumptions in merge tests.The test at line 473 expects values
[1, 101]in that specific order after settingemitter1.value = 1andemitter2.value = 101sequentially. While this likely works in practice, the order might be implementation-dependent. If flakiness occurs, consider usingcontainsAllor sorting before comparison.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
packages/stream_core/test/utils/event_emitter_test.dartpackages/stream_core/test/utils/shared_emitter_test.dartpackages/stream_core/test/utils/state_emitter_test.dart
🔇 Additional comments (13)
packages/stream_core/test/utils/state_emitter_test.dart (5)
6-71: LGTM! Well-structured state-specific tests.The test coverage for state-specific functionality is comprehensive, including initialization, emission to new subscribers, value updates, duplicate value conflation, and error handling after close. The conflation test correctly validates that consecutive duplicate values are skipped.
73-92: LGTM! Correct late subscriber semantics.The test correctly validates that late subscribers receive only the current value (3) rather than the emission history, which is the expected behavior for StateEmitter.
94-123: LGTM! Comprehensive atomic update coverage.The tests correctly validate all three atomic update helpers (
update,getAndUpdate,updateAndGet) with clear semantics for each operation.
125-145: Verify the initial value emission behavior in sync mode.The test shows that even in sync mode, the initial value is emitted asynchronously (requiring
await pumpEventQueue()), while subsequent emissions are synchronous. Ensure this mixed behavior is intentional and documented, as users might expect fully synchronous behavior whensync: trueis specified.
147-170: LGTM! Property tests are correct.The tests properly validate
hasListenerandisClosedproperties through their lifecycle transitions.packages/stream_core/test/utils/shared_emitter_test.dart (5)
9-98: LGTM! Comprehensive basic functionality coverage.The tests thoroughly cover emission behavior, multiple listeners, close semantics, and the distinction between
tryEmit(returns false on error) andemit(throws on error). The test at lines 87-97 is particularly good at demonstrating this distinction.
100-153: LGTM! Type filtering and timeout handling are well-tested.The tests correctly validate type-filtered event handling with
on<E>()andwaitFor<E>(), including the timeout scenario withTimeoutException.
155-299: LGTM! Excellent Stream interface compatibility coverage.The comprehensive test suite validates that the emitter properly implements the Stream interface, exercising all major Stream operators (
where,map,take,skip,distinct,first,firstWhere) and confirmingisBroadcastreturns true.
337-342: LGTM! Good validation of invalid replay parameter.The test correctly validates that negative replay values throw an
ArgumentError, preventing invalid configuration.
390-428: LGTM! Clear behavioral distinction between emitter types.The comparison tests effectively demonstrate the key difference: SharedEmitter only emits to active listeners, while StateEmitter always provides the current value to new subscribers. This is valuable documentation through tests.
packages/stream_core/test/utils/event_emitter_test.dart (3)
4-23: LGTM! Well-designed test event hierarchy.The test event class structure effectively supports testing resolver transformations and type-filtered event handling. The inheritance from
BaseEventallows validating that subtype events are correctly received by base type listeners.
27-156: LGTM! Comprehensive resolver functionality coverage.The tests thoroughly validate the EventResolver system:
- Successful transformations with resolver chains
- Fallback behavior when no resolvers match
- First-match-wins semantics for multiple resolvers
- Cross-type transformations with computed payloads
- Conditional transformations based on event properties
The comment at lines 153-154 correctly explains the type hierarchy behavior where
BaseEventlisteners receiveTransformedEventinstances.
158-258: LGTM! Proper validation of inherited SharedEmitter behavior.The tests confirm that
MutableEventEmittercorrectly inherits allSharedEmittercapabilities (multiple listeners, type filtering, waitFor, replay, sync mode, tryEmit) while adding resolver-based transformation. This ensures backward compatibility and feature completeness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @packages/stream_core/test/utils/state_emitter_test.dart:
- Around line 148-159: The test creates a MutableStateEmitter<int> named emitter
but never closes it; after creating the emitter (before subscribing) call
addTearDown(emitter.close) or ensure you call await emitter.close() at the end
of the test to avoid leaking resources—locate the test using
MutableStateEmitter, add addTearDown(emitter.close) immediately after
instantiation (or await emitter.close() after subscription.cancel()), and keep
existing assertions around hasListener and subscription.cancel as-is.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
packages/stream_core/test/utils/shared_emitter_test.dartpackages/stream_core/test/utils/state_emitter_test.dart
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/stream_core/test/utils/shared_emitter_test.dart
🔇 Additional comments (6)
packages/stream_core/test/utils/state_emitter_test.dart (6)
1-71: LGTM! Comprehensive coverage of state-specific functionality.The tests properly cover initial value behavior, emission mechanics, value setter semantics, conflation of duplicate values, and error handling. Resource cleanup is handled correctly via
addTearDownor explicitclose()calls.
73-92: LGTM! Correctly validates late-subscription semantics.The test properly verifies that StateEmitter follows BehaviorSubject semantics—late subscribers receive only the current value, not the emission history. This is a key distinction from replay-based emitters.
94-123: LGTM! Atomic update methods thoroughly tested.The test suite correctly validates all three atomic update helpers (
update,getAndUpdate,updateAndGet), verifying both return values and state mutations. Resource cleanup is properly handled.
125-145: LGTM! Sync mode behavior correctly validated.The test properly distinguishes between the initial BehaviorSubject emission (awaited with
pumpEventQueue) and subsequent synchronous emissions. This correctly validates that thesyncflag affects new emissions while respecting the underlying stream's initial behavior.
161-169: LGTM! isClosed property correctly tested.The test properly validates the
isClosedproperty lifecycle and explicitly closes the emitter to avoid resource leaks.
172-192: LGTM! Extension behavior thoroughly validated.The test correctly verifies that
asStateEmitter()returns a read-only view that reflects updates from the mutable emitter, with proper type checking and resource cleanup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
packages/stream_core/lib/src/utils/shared_emitter.dart (1)
49-52: Consider documenting default values in the interface.The factory constructor parameters lack default values in the interface declaration, but the implementation provides
replay = 0andsync = falseas defaults (line 107-109). While valid Dart syntax, documenting the default behavior in the doc comments would improve API clarity.📝 Suggested documentation enhancement
/// Creates a [MutableSharedEmitter]. /// /// Supports synchronous or asynchronous event emission via [sync], and - /// can optionally replay the last [replay] events to new subscribers. + /// can optionally replay the last [replay] events to new subscribers. + /// + /// By default, [replay] is 0 (no replay) and [sync] is false (asynchronous). factory MutableSharedEmitter({ int replay, bool sync, }) = SharedEmitterImpl<T>;
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
packages/stream_core/lib/src/utils/shared_emitter.dartpackages/stream_core/test/utils/shared_emitter_test.dart
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/stream_core/test/utils/shared_emitter_test.dart
🔇 Additional comments (6)
packages/stream_core/lib/src/utils/shared_emitter.dart (6)
7-22: LGTM! Clean architectural improvement.The decision to implement
Stream<T>directly is a significant improvement that simplifies the API surface. The addition ofisClosedprovides necessary lifecycle visibility, and the documentation clearly describes the read-only nature and references related types.
107-120: LGTM! Clean factory pattern with proper validation.The constructor design is well-structured:
- Clear separation between public and private constructors
- Static
_createSubjecthelper provides type-appropriate Subject instantiation- Switch expression elegantly handles replay logic with proper validation for negative values
124-138: LGTM! Proper delegation pattern.The method implementations correctly delegate to the underlying
Subject:
emitdirectly calls_shared.addtryEmitelegantly wrapsemitin safe execution (assumingrunSafelySyncis verified)- Property getters and
closeproperly forward to the backing subject
140-155: LGTM! Excellent encapsulation pattern.The extension provides a clean way to expose a read-only view of a mutable emitter. The documentation example clearly demonstrates the intended usage pattern for service boundaries, and the implementation correctly relies on type coercion.
158-191: LGTM! Well-designed convenience methods.Both extension methods provide idiomatic Dart stream operations:
waitFor<E>elegantly combineswhereTypefiltering with optional timeout and proper error documentationon<E>provides type-safe event handler registration with correct return type for subscription managementThe implementations leverage Dart's stream filtering capabilities effectively.
5-5: Import and function signature are correct.The
result.dartimport is valid. TherunSafelySyncfunction exists with the correct signature—it accepts a callback and returns aResult<R>type with anisSuccessboolean property. The usage at line 128 is correct:bool tryEmit(T value) => runSafelySync(() => emit(value)).isSuccess;
Description of the pull request
Part of GetStream/stream-feeds-flutter#79
Summary by CodeRabbit
New Features
Refactor
Tests
✏️ Tip: You can customize this high-level summary in your review settings.